{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# How to stream runnables\n",
    "\n",
    "- Author: [byoon](https://github.com/acho98)\n",
    "- Peer Review : [seofield](https://github.com/seofield), [stsr1284](https://github.com/stsr1284)\n",
    "- Proofread : [Q0211](https://github.com/Q0211)\n",
    "- This is a part of [LangChain Open Tutorial](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial)\n",
    "\n",
    "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/13-LangChain-Expression-Language/14-StreamRunnables.ipynb)[![Open in GitHub](https://img.shields.io/badge/Open%20in%20GitHub-181717?style=flat-square&logo=github&logoColor=white)](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/13-LangChain-Expression-Language/14-StreamRunnables.ipynb)\n",
    "## Overview\n",
    "\n",
    "Streaming is critical in making applications based on LLMs feel responsive to end-users.\n",
    "\n",
    "Important LangChain primitives like chat models, output parsers, prompts, retrievers, and agents implement the LangChain Runnable Interface.\n",
    "\n",
    "This interface provides two general approaches to stream content:\n",
    "\n",
    "1. sync stream and async astream: \n",
    "    - a default implementation of streaming that streams the final output from the chain.\n",
    "\n",
    "2. async astream_events and async astream_log: \n",
    "    - these provide a way to stream both intermediate steps and final output from the chain.\n",
    "\n",
    "Let's explore both approaches, and try to understand how to use them.\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "### Table of Contents\n",
    "\n",
    "- [Overview](#overview)\n",
    "- [Environement Setup](#environment-setup)\n",
    "- [Using Stream](#using-stream)\n",
    "- [Chains](#chains)\n",
    "- [Streaming with Parsers](#streaming-with-parsers)\n",
    "- [Working with Input Streams](#working-with-input-streams)\n",
    "- [Non-Streaming Components](#non-streaming-components)\n",
    "- [Using Stream Events](#using-stream-events)\n",
    "- [Chat Model](#chat-model)\n",
    "- [Filtering Events](#filtering-events)\n",
    "- [Handling Non-Streaming Components](#handling-non-streaming-components)\n",
    "- [Propagating Callbacks](#propagating-callbacks)\n",
    "\n",
    "### References\n",
    "- [Langchain Conceptual Guide > Streaming](https://python.langchain.com/docs/concepts/streaming/)\n",
    "- [LangChain Conceptual Guide > Runnable interface](https://python.langchain.com/docs/concepts/runnables/)\n",
    "----"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Environment Setup\n",
    "\n",
    "Set up the environment. You may refer to [Environment Setup](https://wikidocs.net/257836) for more details.\n",
    "\n",
    "**[Note]**\n",
    "- ```langchain-opentutorial``` is a package that provides a set of easy-to-use environment setup, useful functions and utilities for tutorials. \n",
    "- You can checkout the [```langchain-opentutorial```](https://github.com/LangChain-OpenTutorial/langchain-opentutorial-pypi) for more details."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture --no-stderr\n",
    "%pip install langchain-opentutorial"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Install required packages\n",
    "from langchain_opentutorial import package\n",
    "\n",
    "package.install(\n",
    "    [\n",
    "        \"langchain\",\n",
    "        \"langchain_community\",\n",
    "        \"langchain_openai\",\n",
    "        \"langchain_anthropic\",\n",
    "    ],\n",
    "    verbose=False,\n",
    "    upgrade=False,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Environment variables have been set successfully.\n"
     ]
    }
   ],
   "source": [
    "# Set environment variables\n",
    "from langchain_opentutorial import set_env\n",
    "\n",
    "set_env(\n",
    "    {\n",
    "        \"OPENAI_API_KEY\": \"\",\n",
    "        \"ANTHROPIC_API_KEY\": \"\",\n",
    "    }\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "False"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dotenv import load_dotenv\n",
    "\n",
    "load_dotenv(override=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Using Stream\n",
    "All Runnable objects provide two stream methods:\n",
    "\n",
    "- Synchronous (sync): stream\n",
    "- Asynchronous (async): astream\n",
    "\n",
    "These methods are designed to process the final output in small chunks, returning each chunk as soon as it is available.\n",
    "\n",
    "**How Streaming Works**\n",
    "\n",
    "Streaming is only possible when every step in the program processes input data one chunk at a time and yields the corresponding output chunk.\n",
    "\n",
    "The complexity of processing can vary, such as:\n",
    "\n",
    "- Simple tasks: Emitting tokens generated by an LLM one by one.\n",
    "- Complex tasks: Streaming parts of a JSON result before the entire JSON is completed.\n",
    "\n",
    "The best way to explore streaming is to focus on the most critical component in LLM-based apps: the LLMs themselves.\n",
    "\n",
    "### LLMs and Chat Models\n",
    "LLMs and chat models are the primary bottlenecks in LLM-based applications.\n",
    "\n",
    "LLMs can take several seconds to generate a complete response to a query.   \n",
    "This is much slower than the 200-300ms threshold at which users perceive an application as responsive.\n",
    "\n",
    "**Solution: Use Streaming to Improve Responsiveness**\n",
    "\n",
    "To reduce waiting time for users, it's essential to show intermediate progress.   \n",
    "This can be achieved by streaming the model’s output token by token and displaying it immediately to the user.\n",
    "\n",
    "Let’s explore an example of streaming using a chat model. Choose one of the options below to get started!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain_anthropic import ChatAnthropic\n",
    "\n",
    "model = ChatAnthropic(model=\"claude-3-5-sonnet-latest\", temperature=0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's start with the sync stream API"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "|Polar bears actually| have transparent, hollow fur, not| white fur as commonly believed. Their skin| is black underneath. The fur appears| white because the transparent hair scatters an|d reflects visible light, creating the white appearance we| see. This helps them blend in with their| snowy Arctic environment. The hollow| nature of their fur also helps trap air for| insulation.||"
     ]
    }
   ],
   "source": [
    "chunks = []\n",
    "for chunk in model.stream(\"What color is a polar bear?\"):\n",
    "    chunks.append(chunk)\n",
    "    print(chunk.content, end=\"|\", flush=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Alternatively, if you're working in an async environment, you may consider using the async astream API"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "|Polar bears actually| have transparent, hollow fur, not white| fur as commonly believed. Their skin| is black underneath. The| fur appears white because the transparent hair| scatters and reflects visible light, creating| the white appearance we see.| This helps them blend in with their| snowy Arctic environment. The| hollow nature of their fur also helps trap| air for insulation.||"
     ]
    }
   ],
   "source": [
    "chunks = []\n",
    "async for chunk in model.astream(\"What color is a polar bear?\"):\n",
    "    chunks.append(chunk)\n",
    "    print(chunk.content, end=\"|\", flush=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's inspect one of the chunks"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-2611aa2c-0556-4ea6-b622-cf8010df2785', usage_metadata={'input_tokens': 14, 'output_tokens': 4, 'total_tokens': 18, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "chunks[0]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We got back something called an AIMessageChunk. This chunk represents a part of an AIMessage.\n",
    "\n",
    "Message chunks are additive by design -- one can simply add them up to get the state of the response so far"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "AIMessageChunk(content='Polar bears actually have transparent, hollow fur, not white', additional_kwargs={}, response_metadata={}, id='run-2611aa2c-0556-4ea6-b622-cf8010df2785', usage_metadata={'input_tokens': 14, 'output_tokens': 4, 'total_tokens': 18, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "chunks[0] + chunks[1] + chunks[2]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "AIMessageChunk(content='Polar bears actually have transparent, hollow fur, not white fur as commonly believed. Their skin is black underneath. The fur appears white because the transparent hair', additional_kwargs={}, response_metadata={}, id='run-2611aa2c-0556-4ea6-b622-cf8010df2785', usage_metadata={'input_tokens': 14, 'output_tokens': 4, 'total_tokens': 18, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4] + chunks[5]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Chains\n",
    "Most LLM applications involve multiple steps, not just a simple call to a language model.\n",
    "\n",
    "Here, we’ll create a simple chain using the LangChain Expression Language (LCEL).  \n",
    "This chain will combine a prompt, a model, and a parser, and we’ll verify that streaming works.\n",
    "\n",
    "Example: Using a Parser for Streaming\n",
    "We’ll use the ```StrOutputParser``` to process the output from the model.   \n",
    "This parser extracts the ```content``` field from an ```AIMessageChunk```, which gives us each ```token``` generated by the model.\n",
    "\n",
    "### What is LCEL?\n",
    "LCEL is a declarative way to define a \"program\" by linking together different components (called LangChain primitives).\n",
    "\n",
    "Chains created with LCEL have some advantages:\n",
    "\n",
    "- Built-in streaming: They automatically support stream and astream, allowing the final output to be streamed in chunks.\n",
    "- Standard Runnable interface: Chains built using LCEL fully implement this interface, making them flexible and easy to use."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Streaming with Parsers\n",
    "Even though we’re using a ```parser``` at the end of the chain, streaming still works.\n",
    "\n",
    "This is because the ```parser``` processes each streaming chunk individually, instead of waiting for the full output.   \n",
    "Many LCEL components also support this kind of chunk-by-chunk processing, which makes it easier to build applications that need streaming.\n",
    "\n",
    "You can create custom functions that return generators to handle streaming data effectively.\n",
    "\n",
    "Certain components, like prompt templates or chat models, don’t process individual chunks.   \n",
    "Instead, they collect all data from previous steps before proceeding.   \n",
    "This behavior can interrupt the streaming process, so keep it in mind when designing your chain.\n",
    "\n",
    "LCEL gives you flexibility to decide how your chain operates (e.g., sync/async, batch/streaming).  \n",
    "If LCEL doesn’t fit your needs, you can always use a standard, step-by-step approach:\n",
    "\n",
    "- Call invoke, batch, or stream on each component.\n",
    "- Assign the results to variables.\n",
    "- Use those variables in the next steps as needed.  \n",
    "This lets you switch between a declarative or imperative style depending on what works best for your project.\n",
    "\n",
    "Now, let’s get started with creating a simple chain!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "|Here's an| apple joke for you:\n",
      "\n",
      "Why| did the apple go to the| doctor?\n",
      "Because it wasn|'t peeling well! \n",
      "\n",
      "Here|'s another one:\n",
      "What kin|d of apple has a short temper?\n",
      "A| crab apple! |🍎||"
     ]
    }
   ],
   "source": [
    "from langchain_core.output_parsers import StrOutputParser\n",
    "from langchain_core.prompts import ChatPromptTemplate\n",
    "\n",
    "prompt = ChatPromptTemplate.from_template(\"tell me a joke about {topic}\")\n",
    "parser = StrOutputParser()\n",
    "chain = prompt | model | parser\n",
    "\n",
    "async for chunk in chain.astream({\"topic\": \"apple\"}):\n",
    "    print(chunk, end=\"|\", flush=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Working with Input Streams\n",
    "What if you wanted to stream JSON as it’s being generated?\n",
    "\n",
    "If you use json.loads to parse partial JSON, it will fail because partial JSON is not valid JSON.   \n",
    "At first, this might seem impossible to solve, and you might assume that streaming JSON isn’t doable.\n",
    "\n",
    "It turns out there is a way!\n",
    "\n",
    "- The parser needs to process the input stream and try to \"auto-complete\" the partial JSON to make it valid.\n",
    "- This allows the JSON to be processed and streamed chunk by chunk.\n",
    "\n",
    "To better understand how this works, we’ll explore an example of a parser that can handle partial JSON streams effectively."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{}\n",
      "{'countries': [{}]}\n",
      "{'countries': [{'name': 'France'}]}\n",
      "{'countries': [{'name': 'France', 'population': 67391582}]}\n",
      "{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain'}]}\n",
      "{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615}]}\n",
      "{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {}]}\n",
      "{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea'}]}\n",
      "{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea', 'population': 51744876}]}\n"
     ]
    }
   ],
   "source": [
    "from langchain_core.output_parsers import JsonOutputParser\n",
    "\n",
    "chain = (\n",
    "    model | JsonOutputParser()\n",
    ")  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models\n",
    "async for text in chain.astream(\n",
    "    \"output a list of the countries france, spain and korea and their populations in JSON format. \"\n",
    "    'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
    "    \"Each country should have the key `name` and `population`\"\n",
    "):\n",
    "    print(text, flush=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, let's break streaming. We'll use the previous example and append an extraction function at the end that extracts the country names from the finalized JSON.\n",
    "\n",
    "Why Does This Break Streaming?\n",
    "- Any step in the chain that processes finalized inputs (rather than handling input streams) can disrupt the streaming process.\n",
    "- When this happens, both ```stream``` and ```astream``` functionalities stop working properly.\n",
    "\n",
    "Later, we will discuss the ```astream_events``` API which streams results from intermediate steps.   \n",
    "This API will stream results from intermediate steps even if the chain contains steps that only operate on finalized inputs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['France', 'Spain', 'Korea']|"
     ]
    }
   ],
   "source": [
    "from langchain_core.output_parsers import (\n",
    "    JsonOutputParser,\n",
    ")\n",
    "\n",
    "# A function that operates on finalized inputs\n",
    "# rather than on an input_stream\n",
    "def _extract_country_names(inputs):\n",
    "    \"\"\"A function that does not operates on input streams and breaks streaming.\"\"\"\n",
    "    if not isinstance(inputs, dict):\n",
    "        return []\n",
    "\n",
    "    countries = inputs.get(\"countries\", [])\n",
    "\n",
    "    if not isinstance(countries, list):\n",
    "        return []\n",
    "\n",
    "    country_names = [\n",
    "        country.get(\"name\") for country in countries if isinstance(country, dict)\n",
    "    ]\n",
    "    return country_names\n",
    "\n",
    "chain = model | JsonOutputParser() | _extract_country_names\n",
    "\n",
    "async for text in chain.astream(\n",
    "    \"output a list of the countries france, spain and korea and their populations in JSON format. \"\n",
    "    'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
    "    \"Each country should have the key `name` and `population`\"\n",
    "):\n",
    "    print(text, end=\"|\", flush=True)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Generator Functions**\n",
    "\n",
    "Let’s fix the streaming issue by using a generator function.\n",
    "A generator function allows you to process input streams efficiently by using the yield keyword.\n",
    "\n",
    "Why Use a Generator Function?\n",
    "- A generator function processes input streams piece by piece.\n",
    "- It’s a simple way to handle streaming data without waiting for the entire input to be completed.\n",
    "\n",
    "A generator function (a function that uses yield) makes it easier to write code that works with streaming inputs. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "France|Spain|Korea|"
     ]
    }
   ],
   "source": [
    "from langchain_core.output_parsers import JsonOutputParser\n",
    "\n",
    "async def _extract_country_names_streaming(input_stream):\n",
    "    \"\"\"A function that operates on input streams.\"\"\"\n",
    "    country_names_so_far = set()\n",
    "\n",
    "    async for input in input_stream:\n",
    "        if not isinstance(input, dict):\n",
    "            continue\n",
    "\n",
    "        if \"countries\" not in input:\n",
    "            continue\n",
    "\n",
    "        countries = input[\"countries\"]\n",
    "\n",
    "        if not isinstance(countries, list):\n",
    "            continue\n",
    "\n",
    "        for country in countries:\n",
    "            name = country.get(\"name\")\n",
    "            if not name:\n",
    "                continue\n",
    "            if name not in country_names_so_far:\n",
    "                yield name\n",
    "                country_names_so_far.add(name)\n",
    "\n",
    "\n",
    "chain = model | JsonOutputParser() | _extract_country_names_streaming\n",
    "\n",
    "async for text in chain.astream(\n",
    "    \"output a list of the countries france, spain and korea and their populations in JSON format. \"\n",
    "    'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
    "    \"Each country should have the key `name` and `population`\",\n",
    "):\n",
    "    print(text, end=\"|\", flush=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Because the code above is relying on JSON auto-completion, you may see partial names of countries (e.g., Sp and Spain), which is not what one would want for an extraction result!\n",
    "\n",
    "We're focusing on streaming concepts, not necessarily the results of the chains. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Non-Streaming Components\n",
    "\n",
    "Some built-in components like Retrievers do not offer any streaming. What happens if we try to stream them? 🤨"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[[Document(id='a5d7c82a-e94f-4baa-8822-605c0b987dc7', metadata={}, page_content='harrison worked at langchain'),\n",
       "  Document(id='6e7a681c-8208-4c2f-9015-4e64be453461', metadata={}, page_content='harrison likes spicy food')]]"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from langchain_community.vectorstores import FAISS\n",
    "from langchain_core.output_parsers import StrOutputParser\n",
    "from langchain_core.prompts import ChatPromptTemplate\n",
    "from langchain_core.runnables import RunnablePassthrough\n",
    "from langchain_openai import OpenAIEmbeddings\n",
    "\n",
    "template = \"\"\"Answer the question based only on the following context:\n",
    "{context}\n",
    "\n",
    "Question: {question}\n",
    "\"\"\"\n",
    "prompt = ChatPromptTemplate.from_template(template)\n",
    "\n",
    "vectorstore = FAISS.from_texts(\n",
    "    [\"harrison worked at langchain\", \"harrison likes spicy food\"],\n",
    "    embedding=OpenAIEmbeddings(),\n",
    ")\n",
    "retriever = vectorstore.as_retriever()\n",
    "\n",
    "chunks = [chunk for chunk in retriever.stream(\"where did harrison work?\")]\n",
    "chunks"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Stream just yielded the final result from that component.\n",
    "\n",
    "This is OK 🥹! Not all components have to implement streaming -- in some cases streaming is either unnecessary, difficult or just doesn't make sense.\n",
    "\n",
    "An LCEL chain constructed using non-streaming components, will still be able to stream in a lot of cases, with streaming of partial output starting after the last non-streaming step in the chain."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "retrieval_chain = (\n",
    "    {\n",
    "        \"context\": retriever.with_config(run_name=\"Docs\"),\n",
    "        \"question\": RunnablePassthrough(),\n",
    "    }\n",
    "    | prompt\n",
    "    | model\n",
    "    | StrOutputParser()\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "|According| to the context, Harrison worked at| Langchain.\n",
      "\n",
      "Here are |3 made up sentences about Langchain|:\n",
      "1. Langchain| is a bustling tech company locate|d in a modern office building with floor|-to-ceiling windows.\n",
      "2. The| company's collaborative workspace features comfortable loun|ges where employees often brainstorm innovative| ideas.\n",
      "3. Langchain's office| is known for its well-stocked coffee| bar and weekly team-building activities.||"
     ]
    }
   ],
   "source": [
    "for chunk in retrieval_chain.stream(\n",
    "    \"Where did harrison work? \" \"Write 3 made up sentences about this place.\"\n",
    "):\n",
    "    print(chunk, end=\"|\", flush=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that we've seen how stream and astream work, let's venture into the world of streaming events. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Using Stream Events\n",
    "Stream Events is a beta API designed for handling event-based streaming in LangChain. Since it’s still in development, it may undergo changes based on user feedback.\n",
    "- This guide uses the V2 API, which requires langchain-core >= 0.2.\n",
    "- For the V1 API compatible with older versions of LangChain, see [here](https://python.langchain.com/v0.1/docs/expression_language/streaming/#using-stream-events)\n",
    "\n",
    "To ensure the ```astream_events``` API works correctly, follow these guidelines\n",
    "1. Use async Across the Code:\n",
    "    - Wherever possible, use ```async``` functions and tools throughout your code to maintain compatibility with the API.\n",
    "2. Propagate Callbacks:\n",
    "    - If you’re defining custom functions or runnable objects, make sure to propagate callbacks properly so that events can be captured and processed.\n",
    "3. Force LLMs to Stream Tokens:\n",
    "    - When using runnables without LCEL, call ```.astream()``` on LLMs instead of ```.ainvoke``` to ensure tokens are streamed incrementally.\n",
    "4. Test and Report Issues:\n",
    "    - If anything doesn’t work as expected, provide feedback to improve the API.\n",
    "\n",
    "### Event Reference\n",
    "When streaming is implemented properly, the inputs to a runnable will not be known until after the input stream has been entirely consumed.   \n",
    "This means that ```inputs``` will often be included only for ```end``` events and rather than for ```start``` events. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'0.3.28'"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import langchain_core\n",
    "\n",
    "langchain_core.__version__"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Chat Model\n",
    "Let's start off by looking at the events produced by a chat model.\n",
    "\n",
    "**What's with the version=\"v2\" parameter? 😾**\n",
    "- The version=\"v2\" parameter is there because this is a beta API. Since we’re still making changes to it, this parameter helps avoid breaking your code in the future.\n",
    "\n",
    "💡 Note: v2 is only supported in langchain-core >= 0.2.0."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "events = []\n",
    "async for event in model.astream_events(\"hello\", version=\"v2\"):\n",
    "    events.append(event)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's take a look at the few of the start event and a few of the end events."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[{'event': 'on_chat_model_start',\n",
       "  'data': {'input': 'hello'},\n",
       "  'name': 'ChatAnthropic',\n",
       "  'tags': [],\n",
       "  'run_id': '42b66300-54db-4f6d-82b8-8b7fe89ca077',\n",
       "  'metadata': {'ls_provider': 'anthropic',\n",
       "   'ls_model_name': 'claude-3-5-sonnet-latest',\n",
       "   'ls_model_type': 'chat',\n",
       "   'ls_temperature': 0.0,\n",
       "   'ls_max_tokens': 1024},\n",
       "  'parent_ids': []},\n",
       " {'event': 'on_chat_model_stream',\n",
       "  'run_id': '42b66300-54db-4f6d-82b8-8b7fe89ca077',\n",
       "  'name': 'ChatAnthropic',\n",
       "  'tags': [],\n",
       "  'metadata': {'ls_provider': 'anthropic',\n",
       "   'ls_model_name': 'claude-3-5-sonnet-latest',\n",
       "   'ls_model_type': 'chat',\n",
       "   'ls_temperature': 0.0,\n",
       "   'ls_max_tokens': 1024},\n",
       "  'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-42b66300-54db-4f6d-82b8-8b7fe89ca077', usage_metadata={'input_tokens': 8, 'output_tokens': 1, 'total_tokens': 9, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},\n",
       "  'parent_ids': []},\n",
       " {'event': 'on_chat_model_stream',\n",
       "  'run_id': '42b66300-54db-4f6d-82b8-8b7fe89ca077',\n",
       "  'name': 'ChatAnthropic',\n",
       "  'tags': [],\n",
       "  'metadata': {'ls_provider': 'anthropic',\n",
       "   'ls_model_name': 'claude-3-5-sonnet-latest',\n",
       "   'ls_model_type': 'chat',\n",
       "   'ls_temperature': 0.0,\n",
       "   'ls_max_tokens': 1024},\n",
       "  'data': {'chunk': AIMessageChunk(content='Hi', additional_kwargs={}, response_metadata={}, id='run-42b66300-54db-4f6d-82b8-8b7fe89ca077')},\n",
       "  'parent_ids': []}]"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "events[:3]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[{'event': 'on_chat_model_stream',\n",
       "  'run_id': '42b66300-54db-4f6d-82b8-8b7fe89ca077',\n",
       "  'name': 'ChatAnthropic',\n",
       "  'tags': [],\n",
       "  'metadata': {'ls_provider': 'anthropic',\n",
       "   'ls_model_name': 'claude-3-5-sonnet-latest',\n",
       "   'ls_model_type': 'chat',\n",
       "   'ls_temperature': 0.0,\n",
       "   'ls_max_tokens': 1024},\n",
       "  'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-42b66300-54db-4f6d-82b8-8b7fe89ca077', usage_metadata={'input_tokens': 0, 'output_tokens': 12, 'total_tokens': 12, 'input_token_details': {}})},\n",
       "  'parent_ids': []},\n",
       " {'event': 'on_chat_model_end',\n",
       "  'data': {'output': AIMessageChunk(content='Hi! How can I help you today?', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-42b66300-54db-4f6d-82b8-8b7fe89ca077', usage_metadata={'input_tokens': 8, 'output_tokens': 13, 'total_tokens': 21, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},\n",
       "  'run_id': '42b66300-54db-4f6d-82b8-8b7fe89ca077',\n",
       "  'name': 'ChatAnthropic',\n",
       "  'tags': [],\n",
       "  'metadata': {'ls_provider': 'anthropic',\n",
       "   'ls_model_name': 'claude-3-5-sonnet-latest',\n",
       "   'ls_model_type': 'chat',\n",
       "   'ls_temperature': 0.0,\n",
       "   'ls_max_tokens': 1024},\n",
       "  'parent_ids': []}]"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "events[-2:]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's revisit the example chain that parsed streaming JSON to explore the streaming events API."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "chain = (\n",
    "    model | JsonOutputParser()\n",
    ") \n",
    "\n",
    "events = [\n",
    "    event\n",
    "    async for event in chain.astream_events(\n",
    "        \"output a list of the countries france, spain and korea and their populations in JSON format. \"\n",
    "        'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
    "        \"Each country should have the key `name` and `population`\",\n",
    "        version=\"v2\",\n",
    "    )\n",
    "]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you examine at the first few events, you'll notice that there are 3 different start events rather than 2 start events.\n",
    "\n",
    "The three start events correspond to:\n",
    "\n",
    "1. The chain (model + parser)\n",
    "2. The model\n",
    "3. The parser"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[{'event': 'on_chain_start',\n",
       "  'data': {'input': 'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`'},\n",
       "  'name': 'RunnableSequence',\n",
       "  'tags': [],\n",
       "  'run_id': '4b40eb58-73e1-4b7a-9b8f-3caa7530dce2',\n",
       "  'metadata': {},\n",
       "  'parent_ids': []},\n",
       " {'event': 'on_chat_model_start',\n",
       "  'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}},\n",
       "  'name': 'ChatAnthropic',\n",
       "  'tags': ['seq:step:1'],\n",
       "  'run_id': 'a1c23e6f-6dd1-4de6-b41a-e518d2a3d54f',\n",
       "  'metadata': {'ls_provider': 'anthropic',\n",
       "   'ls_model_name': 'claude-3-5-sonnet-latest',\n",
       "   'ls_model_type': 'chat',\n",
       "   'ls_temperature': 0.0,\n",
       "   'ls_max_tokens': 1024},\n",
       "  'parent_ids': ['4b40eb58-73e1-4b7a-9b8f-3caa7530dce2']},\n",
       " {'event': 'on_chat_model_stream',\n",
       "  'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-a1c23e6f-6dd1-4de6-b41a-e518d2a3d54f', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},\n",
       "  'run_id': 'a1c23e6f-6dd1-4de6-b41a-e518d2a3d54f',\n",
       "  'name': 'ChatAnthropic',\n",
       "  'tags': ['seq:step:1'],\n",
       "  'metadata': {'ls_provider': 'anthropic',\n",
       "   'ls_model_name': 'claude-3-5-sonnet-latest',\n",
       "   'ls_model_type': 'chat',\n",
       "   'ls_temperature': 0.0,\n",
       "   'ls_max_tokens': 1024},\n",
       "  'parent_ids': ['4b40eb58-73e1-4b7a-9b8f-3caa7530dce2']}]"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "events[:3]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "What do you think you'd see if you looked at the last 3 events? what about the middle?   \n",
    "Let's use this API to take output the stream events from the model and the parser. We're ignoring start events, end events and events from the chain.\n",
    "\n",
    "both the model and the parser support streaming, we see streaming events from both components in real time! Kind of cool isn't it? 🦜\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Chat model chunk: ''\n",
      "Chat model chunk: '{'\n",
      "Parser chunk: {}\n",
      "Chat model chunk: '\\n  \"countries\":'\n",
      "Chat model chunk: ' [\\n    {\\n      \"name\": \"'\n",
      "Parser chunk: {'countries': [{'name': ''}]}\n",
      "Chat model chunk: 'France\",\\n      \"'\n",
      "Parser chunk: {'countries': [{'name': 'France'}]}\n",
      "Chat model chunk: 'population\": 67391582\\n    },'\n",
      "Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}]}\n",
      "Chat model chunk: '\\n    {\\n      \"name\": \"Spain\",'\n",
      "Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain'}]}\n",
      "Chat model chunk: '\\n      \"population\": 47615'\n",
      "Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615}]}\n",
      "Chat model chunk: '034\\n    },\\n    '\n",
      "Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}]}\n",
      "Chat model chunk: '{\\n      \"name\": \"Korea\",'\n",
      "Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea'}]}\n",
      "Chat model chunk: '\\n      \"population\": 51744'\n",
      "Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea', 'population': 51744}]}\n",
      "...\n"
     ]
    }
   ],
   "source": [
    "num_events = 0\n",
    "\n",
    "async for event in chain.astream_events(\n",
    "    \"output a list of the countries france, spain and korea and their populations in JSON format. \"\n",
    "    'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
    "    \"Each country should have the key `name` and `population`\",\n",
    "    version=\"v2\",\n",
    "):\n",
    "    kind = event[\"event\"]\n",
    "    if kind == \"on_chat_model_stream\":\n",
    "        print(\n",
    "            f\"Chat model chunk: {repr(event['data']['chunk'].content)}\",\n",
    "            flush=True,\n",
    "        )\n",
    "    if kind == \"on_parser_stream\":\n",
    "        print(f\"Parser chunk: {event['data']['chunk']}\", flush=True)\n",
    "    num_events += 1\n",
    "    if num_events > 30:\n",
    "        # Truncate the output\n",
    "        print(\"...\")\n",
    "        break"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Filtering Events\n",
    "Because this API produces so many events, it is useful to be able to filter on events.\n",
    "\n",
    "You can filter by either component name, component tags or component type.\n",
    "\n",
    "### By Name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'metadata': {}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}\n",
      "{'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}\n",
      "{'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}\n",
      "{'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France'}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}\n",
      "{'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}\n",
      "{'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain'}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}\n",
      "{'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}\n",
      "{'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}\n",
      "{'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea'}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}\n",
      "{'event': 'on_parser_stream', 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea', 'population': 51744876}]}}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}\n",
      "{'event': 'on_parser_end', 'data': {'output': {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea', 'population': 51744876}]}}, 'run_id': 'ddad9ed0-d333-45c1-8150-8297c2c981da', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'parent_ids': ['56501f2f-f301-4617-b531-27c3c1cdbb66']}\n",
      "...\n"
     ]
    }
   ],
   "source": [
    "chain = model.with_config({\"run_name\": \"model\"}) | JsonOutputParser().with_config(\n",
    "    {\"run_name\": \"my_parser\"}\n",
    ")\n",
    "\n",
    "max_events = 0\n",
    "async for event in chain.astream_events(\n",
    "    \"output a list of the countries france, spain and korea and their populations in JSON format. \"\n",
    "    'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
    "    \"Each country should have the key `name` and `population`\",\n",
    "    version=\"v2\",\n",
    "    include_names=[\"my_parser\"],\n",
    "):\n",
    "    print(event)\n",
    "    max_events += 1\n",
    "    if max_events > 10:\n",
    "        # Truncate output\n",
    "        print(\"...\")\n",
    "        break"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### By Type"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'event': 'on_chat_model_start', 'data': {'input': 'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'model', 'tags': ['seq:step:1'], 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\\n  \"countries\":', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content=' [\\n    {\\n      \"name\": \"', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='France\",\\n      \"population\": 67391', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='582\\n    },', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\\n    {\\n      \"', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='name\": \"Spain\",\\n      \"population\":', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content=' 47615034\\n    },\\n    ', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{\\n      \"name\": \"Korea\",', additional_kwargs={}, response_metadata={}, id='run-75c6e43d-c883-4e23-b7e4-bd8be694f1c0')}, 'run_id': '75c6e43d-c883-4e23-b7e4-bd8be694f1c0', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['b2377705-61a4-420c-a9aa-d6e1ade3d11d']}\n",
      "...\n"
     ]
    }
   ],
   "source": [
    "chain = model.with_config({\"run_name\": \"model\"}) | JsonOutputParser().with_config(\n",
    "    {\"run_name\": \"my_parser\"}\n",
    ")\n",
    "\n",
    "max_events = 0\n",
    "async for event in chain.astream_events(\n",
    "    'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`',\n",
    "    version=\"v2\",\n",
    "    include_types=[\"chat_model\"],\n",
    "):\n",
    "    print(event)\n",
    "    max_events += 1\n",
    "    if max_events > 10:\n",
    "        # Truncate output\n",
    "        print(\"...\")\n",
    "        break"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### By Tags\n",
    "Tags in a runnable are inherited by all its child components.\n",
    "\n",
    "This means that if you assign tags to a parent component, those tags will automatically apply to its children as well.\n",
    "\n",
    "For example:\n",
    "- If a parent runnable has the tag [\"filter\"], all its child components will also inherit this tag.\n",
    "- This can affect behavior if you're using tags to filter or track specific components.\n",
    "\n",
    "What You Should Keep in Mind:\n",
    "- Filtering Tags: If you're using tags to filter components, make sure this automatic inheritance is what you want.\n",
    "- Overlapping Tags: Be cautious if child components already have their own tags, as inherited tags may lead to unintended behavior.\n",
    "- Intentional Tagging: If you don't want tags to apply to certain child components, you may need to adjust the tagging logic manually.\n",
    "\n",
    "By understanding how tags work, you can ensure they align with your desired functionality! "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': '20a14e39-922c-47d9-ac1b-773756da2a3e', 'metadata': {}, 'parent_ids': []}\n",
      "{'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}}, 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-46af8147-b0d5-4ce1-b472-c58207efa78f', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}\n",
      "{'event': 'on_parser_start', 'data': {}, 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'run_id': '50351c06-2ea6-4045-b70d-a2e9de241353', 'metadata': {}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-46af8147-b0d5-4ce1-b472-c58207efa78f')}, 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}\n",
      "{'event': 'on_parser_stream', 'run_id': '50351c06-2ea6-4045-b70d-a2e9de241353', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}\n",
      "{'event': 'on_chain_stream', 'run_id': '20a14e39-922c-47d9-ac1b-773756da2a3e', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': []}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\\n  \"countries\": [\\n    {', additional_kwargs={}, response_metadata={}, id='run-46af8147-b0d5-4ce1-b472-c58207efa78f')}, 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}\n",
      "{'event': 'on_parser_stream', 'run_id': '50351c06-2ea6-4045-b70d-a2e9de241353', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}\n",
      "{'event': 'on_chain_stream', 'run_id': '20a14e39-922c-47d9-ac1b-773756da2a3e', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}, 'parent_ids': []}\n",
      "{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\\n      \"name\": \"France\",\\n      \"', additional_kwargs={}, response_metadata={}, id='run-46af8147-b0d5-4ce1-b472-c58207efa78f')}, 'run_id': '46af8147-b0d5-4ce1-b472-c58207efa78f', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-5-sonnet-latest', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['20a14e39-922c-47d9-ac1b-773756da2a3e']}\n",
      "...\n"
     ]
    }
   ],
   "source": [
    "chain = (model | JsonOutputParser()).with_config({\"tags\": [\"my_chain\"]})\n",
    "\n",
    "max_events = 0\n",
    "async for event in chain.astream_events(\n",
    "    'output a list of the countries france, spain and korea and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`',\n",
    "    version=\"v2\",\n",
    "    include_tags=[\"my_chain\"],\n",
    "):\n",
    "    print(event)\n",
    "    max_events += 1\n",
    "    if max_events > 10:\n",
    "        # Truncate output\n",
    "        print(\"...\")\n",
    "        break"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Handling Non-Streaming Components\n",
    "Some components don’t work well with input streams and may interrupt the streaming of the final output when using astream.\n",
    "\n",
    "However, when using astream_events, you can still get streaming events from intermediate steps that support streaming!   \n",
    "This allows partial progress to be tracked, even if some components are not compatible with streaming."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Function that does not support streaming.\n",
    "# It operates on the finalizes inputs rather than\n",
    "# operating on the input stream.\n",
    "def _extract_country_names(inputs):\n",
    "    \"\"\"A function that does not operates on input streams and breaks streaming.\"\"\"\n",
    "    if not isinstance(inputs, dict):\n",
    "        return \"\"\n",
    "\n",
    "    if \"countries\" not in inputs:\n",
    "        return \"\"\n",
    "\n",
    "    countries = inputs[\"countries\"]\n",
    "\n",
    "    if not isinstance(countries, list):\n",
    "        return \"\"\n",
    "\n",
    "    country_names = [\n",
    "        country.get(\"name\") for country in countries if isinstance(country, dict)\n",
    "    ]\n",
    "    return country_names\n",
    "\n",
    "\n",
    "chain = (\n",
    "    model | JsonOutputParser() | _extract_country_names\n",
    ") "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As expected, the astream API doesn't work correctly because ```_extract_country_names``` doesn't operate on streams."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['France', 'Spain', 'Korea']\n"
     ]
    }
   ],
   "source": [
    "async for chunk in chain.astream(\n",
    "    \"output a list of the countries france, spain and korea and their populations in JSON format. \"\n",
    "    'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
    "    \"Each country should have the key `name` and `population`\",\n",
    "):\n",
    "    print(chunk, flush=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, let's confirm that with astream_events we're still seeing streaming output from the model and the parser."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Chat model chunk: ''\n",
      "Chat model chunk: '{'\n",
      "Parser chunk: {}\n",
      "Chat model chunk: '\\n  \"countries\": [\\n    {'\n",
      "Parser chunk: {'countries': [{}]}\n",
      "Chat model chunk: '\\n      \"name\": \"France\",\\n      \"'\n",
      "Parser chunk: {'countries': [{'name': 'France'}]}\n",
      "Chat model chunk: 'population\": 67391582\\n    },'\n",
      "Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}]}\n",
      "Chat model chunk: '\\n    {\\n      \"name\": \"Spain\",'\n",
      "Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain'}]}\n",
      "Chat model chunk: '\\n      \"population\": 47615'\n",
      "Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615}]}\n",
      "Chat model chunk: '034\\n    },\\n    {\\n      \"name'\n",
      "Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {}]}\n",
      "Chat model chunk: '\": \"Korea\",\\n      \"population\": '\n",
      "Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea'}]}\n",
      "Chat model chunk: '51744876\\n    }\\n  '\n",
      "Parser chunk: {'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47615034}, {'name': 'Korea', 'population': 51744876}]}\n",
      "Chat model chunk: ']\\n}'\n",
      "Chat model chunk: ''\n",
      "...\n"
     ]
    }
   ],
   "source": [
    "num_events = 0\n",
    "\n",
    "async for event in chain.astream_events(\n",
    "    \"output a list of the countries france, spain and korea and their populations in JSON format. \"\n",
    "    'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
    "    \"Each country should have the key `name` and `population`\",\n",
    "    version=\"v2\",\n",
    "):\n",
    "    kind = event[\"event\"]\n",
    "    if kind == \"on_chat_model_stream\":\n",
    "        print(\n",
    "            f\"Chat model chunk: {repr(event['data']['chunk'].content)}\",\n",
    "            flush=True,\n",
    "        )\n",
    "    if kind == \"on_parser_stream\":\n",
    "        print(f\"Parser chunk: {event['data']['chunk']}\", flush=True)\n",
    "    num_events += 1\n",
    "    if num_events > 30:\n",
    "        # Truncate the output\n",
    "        print(\"...\")\n",
    "        break"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Propagating Callbacks\n",
    "When invoking runnables inside your tools, you must manually propagate callbacks to the runnable. Without this, no stream events will be generated.\n",
    "\n",
    "If you're using ```RunnableLambdas``` or the ```@chain``` decorator, callbacks are automatically propagated in the background."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "This example shows how callbacks are not propagated in a custom tool (bad_tool), resulting in incomplete event tracking and missing streaming events (on_chain_stream)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'bad_tool', 'tags': [], 'run_id': 'f179cc39-a3d2-4fb3-83f7-7574dfecf2f2', 'metadata': {}, 'parent_ids': []}\n",
      "{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '75693727-ebbc-4cd3-9142-700e8e54a6b3', 'metadata': {}, 'parent_ids': ['f179cc39-a3d2-4fb3-83f7-7574dfecf2f2']}\n",
      "{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '75693727-ebbc-4cd3-9142-700e8e54a6b3', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['f179cc39-a3d2-4fb3-83f7-7574dfecf2f2']}\n",
      "{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'f179cc39-a3d2-4fb3-83f7-7574dfecf2f2', 'name': 'bad_tool', 'tags': [], 'metadata': {}, 'parent_ids': []}\n"
     ]
    }
   ],
   "source": [
    "from langchain_core.runnables import RunnableLambda\n",
    "from langchain_core.tools import tool\n",
    "\n",
    "def reverse_word(word: str):\n",
    "    return word[::-1]\n",
    "\n",
    "reverse_word = RunnableLambda(reverse_word)\n",
    "\n",
    "@tool\n",
    "def bad_tool(word: str):\n",
    "    \"\"\"Custom tool that doesn't propagate callbacks.\"\"\"\n",
    "    return reverse_word.invoke(word)\n",
    "\n",
    "\n",
    "async for event in bad_tool.astream_events(\"hello\", version=\"v2\"):\n",
    "    print(event)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you're invoking runnables from within ```RunnableLambda``` or ```@chains```, callbacks are handled automatically.\n",
    "\n",
    "This means:\n",
    "- You don’t need to manually pass callbacks between runnables.\n",
    "- Events like on_chain_start, on_chain_end, and even streaming events (on_chain_stream) are seamlessly emitted and linked."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': 'bba3e8c4-7e4c-4daa-8c45-03f45a30913d', 'metadata': {}, 'parent_ids': []}\n",
      "{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '883ad65d-cebb-41ef-8ce8-718c4a310521', 'metadata': {}, 'parent_ids': ['bba3e8c4-7e4c-4daa-8c45-03f45a30913d']}\n",
      "{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '883ad65d-cebb-41ef-8ce8-718c4a310521', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['bba3e8c4-7e4c-4daa-8c45-03f45a30913d']}\n",
      "{'event': 'on_chain_stream', 'run_id': 'bba3e8c4-7e4c-4daa-8c45-03f45a30913d', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'chunk': '43214321'}, 'parent_ids': []}\n",
      "{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': 'bba3e8c4-7e4c-4daa-8c45-03f45a30913d', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'parent_ids': []}\n"
     ]
    }
   ],
   "source": [
    "from langchain_core.runnables import RunnableLambda\n",
    "\n",
    "async def reverse_and_double(word: str):\n",
    "    return await reverse_word.ainvoke(word) * 2\n",
    "\n",
    "reverse_and_double = RunnableLambda(reverse_and_double)\n",
    "\n",
    "async for event in reverse_and_double.astream_events(\"1234\", version=\"v2\"):\n",
    "    print(event)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And with the ```@chain``` decorator:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': 'bc7ff368-3ac9-48be-a18a-882441d1c065', 'metadata': {}, 'parent_ids': []}\n",
      "{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': 'be79cf18-29e6-4de7-acf6-45f51cd101e3', 'metadata': {}, 'parent_ids': ['bc7ff368-3ac9-48be-a18a-882441d1c065']}\n",
      "{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': 'be79cf18-29e6-4de7-acf6-45f51cd101e3', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['bc7ff368-3ac9-48be-a18a-882441d1c065']}\n",
      "{'event': 'on_chain_stream', 'run_id': 'bc7ff368-3ac9-48be-a18a-882441d1c065', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'chunk': '43214321'}, 'parent_ids': []}\n",
      "{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': 'bc7ff368-3ac9-48be-a18a-882441d1c065', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'parent_ids': []}\n"
     ]
    }
   ],
   "source": [
    "from langchain_core.runnables import chain\n",
    "\n",
    "@chain\n",
    "async def reverse_and_double(word: str):\n",
    "    return await reverse_word.ainvoke(word) * 2\n",
    "\n",
    "async for event in reverse_and_double.astream_events(\"1234\", version=\"v2\"):\n",
    "    print(event)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "langchain-opentutorial-VHYpHY_j-py3.11",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.11"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
